package org.snake.nebulae.core.task;

import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.snake.nebulae.core.common.DefaultMqttFactory;
import org.snake.nebulae.core.common.DispatcherHandle;
import org.snake.nebulae.core.exception.ServiceException;
import org.snake.nebulae.core.model.MqttFailureHolder;
import org.snake.nebulae.core.model.MqttMessageHolder;
import org.snake.nebulae.core.model.ServiceInfo;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;

@Component
@Slf4j
public class CoreServiceTask implements Runnable {

    protected ServiceInfo serviceInfo = new ServiceInfo();

    @Resource
    protected DefaultMqttFactory defaultMqttFactory;

    protected MqttAsyncClient mqttAsyncClient;

    @Resource
    protected ObjectMapper objectMapper;

    @Resource
    protected DispatcherHandle dispatcherHandle;

    protected Class<?> self;

    /**
     * 服务启动成功
     */
    protected CompletableFuture<IMqttToken> serviceStartSuccess = new CompletableFuture<>();

    /**
     * 服务启动失败
     */
    protected CompletableFuture<MqttFailureHolder> serviceStartError = new CompletableFuture<>();

    /**
     * 服务处理之前
     */
    protected CompletableFuture<MqttMessageHolder> serviceHandleBefore = new CompletableFuture<>();

    /**
     * 服务处理之后
     */
    protected CompletableFuture<MqttMessageHolder> serviceHandleAfter = new CompletableFuture<>();

    public CoreServiceTask() {
        createServiceInfo();
    }

    public CompletableFuture<IMqttToken> getServiceStartSuccess() {
        return serviceStartSuccess;
    }

    @PostConstruct
    public void init() {
        mqttAsyncClient = defaultMqttFactory.choseOneClient();

        doRegisterServiceHandle();

        self = this.getClass();
    }

    @Override
    public void run() {
        doRegisterService();

        try {
            // 获取当前服务列表
            IMqttActionListener serviceAction = new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("{} 服务 {} 启动完成 监听服务地址 {} 时间是 {}", self, serviceInfo.getName(), serviceInfo.getServicePath() + "/#", DateUtil.now());
                    serviceStartSuccess.complete(asyncActionToken);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.info("{} 服务 {} 启动失败 时间是{} 原因是 {}", self, serviceInfo.getName(), DateUtil.now(), exception.getMessage());
                }
            };

            IMqttMessageListener serviceListener = (topic, message) -> {
                // 重复请求的原因在于主题的模糊匹配导致
                // 是否需要跳过重复请求
                if (doHandleRepeat(topic, message)) {
                    return;
                }

                MqttMessageHolder mqttMessageHolder = new MqttMessageHolder(topic, message);
                serviceHandleBefore.complete(mqttMessageHolder);

                // 处理具体的业务逻辑
                dispatcherHandle.request(topic, message);

                serviceHandleAfter.complete(mqttMessageHolder);
            };
            mqttAsyncClient.subscribe(serviceInfo.getServicePath() + "/#", 2, this, serviceAction, serviceListener);
        } catch (MqttException e) {
            log.error("{} 服务启动时时发生错误 {} {}", self, serviceInfo.getServicePath(), e.getMessage());
            serviceStartError.completeExceptionally(new ServiceException(e.getMessage()));
        }
    }

    /**
     * 创建服务信息
     */
    public void createServiceInfo() {

    }

    /**
     * 子类注册服务处理器
     */
    public void doRegisterServiceHandle() {

    }

    /**
     * 注册服务
     */
    @SneakyThrows
    public void doRegisterService() {
        byte[] bytes = objectMapper.writeValueAsBytes(serviceInfo);
        mqttAsyncClient.publish(CoreTask.topicServices, bytes, 2, false);

        doPluginTask();
    }

    /**
     * 挂在插件任务
     */
    public void doPluginTask() {

    }

    /**
     * 跳过重复处理
     *
     * @param topic   主题
     * @param message 消息
     * @return 是否处理过
     */
    public boolean doHandleRepeat(String topic, MqttMessage message) {
        // 模糊主题匹配解决重复处理
        if (topic.contains(CoreTask.topicResponseExt)) {
            log.info("跳过重复处理的请求 {}", new String(message.getPayload()));
            return true;
        }
        return false;
    }
}
